Examples:: celery worker --app=proj -l info celery worker -A proj -l info -Q hipri,lopri celery worker -A proj --concurrency=4 celery worker -A proj --concurrency=1000 -P eventlet celery worker --autoscale=10,0 Options: -A APP, --app=APP app instance to use (e.g. module.attr_name) -b BROKER, --broker=BROKER url to broker. default is 'amqp://guest@localhost//' --loader=LOADER name of custom loader class to use. --config=CONFIG Name of the configuration module --workdir=WORKING_DIRECTORY Optional directory to change to after detaching. -C, --no-color -q, --quiet -c CONCURRENCY, --concurrency=CONCURRENCY Number of child processes processing the queue. The default is the number of CPUs available on your system. -P POOL_CLS, --pool=POOL_CLS Pool implementation: prefork (default), eventlet, gevent, solo or threads. --purge, --discard Purges all waiting tasks before the daemon is started. **WARNING**: This is unrecoverable, and the tasks will be deleted from the messaging server. -l LOGLEVEL, --loglevel=LOGLEVEL Logging level, choose between DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL. -n HOSTNAME, --hostname=HOSTNAME Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname), %n (name) and %d, (domain). -B, --beat Also run the celery beat periodic task scheduler. Please note that there must only be one instance of this service. -s SCHEDULE_FILENAME, --schedule=SCHEDULE_FILENAME Path to the schedule database if running with the -B option. Defaults to celerybeat-schedule. The extension ".db" may be appended to the filename. Apply optimization profile. Supported: default, fair --scheduler=SCHEDULER_CLS Scheduler class to use. Default is celery.beat.PersistentScheduler -S STATE_DB, --statedb=STATE_DB Path to the state database. The extension '.db' may be appended to the filename. Default: None -E, --events Send events that can be captured by monitors like celery events, celerymon, and others. --time-limit=TASK_TIME_LIMIT Enables a hard time limit (in seconds int/float) for tasks. --soft-time-limit=TASK_SOFT_TIME_LIMIT Enables a soft time limit (in seconds int/float) for tasks. --maxtasksperchild=MAX_TASKS_PER_CHILD Maximum number of tasks a pool worker can execute before it's terminated and replaced by a new worker. -Q QUEUES, --queues=QUEUES List of queues to enable for this worker, separated by comma. By default all configured queues are enabled. Example: -Q video,image -X EXCLUDE_QUEUES, --exclude-queues=EXCLUDE_QUEUES -I INCLUDE, --include=INCLUDE Comma separated list of additional modules to import. Example: -I foo.tasks,bar.tasks --autoscale=AUTOSCALE Enable autoscaling by providing max_concurrency, min_concurrency. Example:: --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary) --autoreload Enable autoreloading. --no-execv Don't do execv after multiprocessing child fork. --without-gossip Do not subscribe to other workers events. --without-mingle Do not synchronize with other workers at startup. --without-heartbeat Do not send event heartbeats. --heartbeat-interval=HEARTBEAT_INTERVAL Interval in seconds at which to send worker heartbeat -O OPTIMIZATION -D, --detach -f LOGFILE, --logfile=LOGFILE Path to log file. If no logfile is specified, stderr is used. --pidfile=PIDFILE Optional file used to store the process pid. The program will not start if this file already exists and the pid is still alive. --uid=UID User id, or user name of the user to run as after detaching. --gid=GID Group id, or group name of the main group to change to after detaching. --umask=UMASK Effective umask (in octal) of the process after detaching. Inherits the umask of the parent process by default. --executable=EXECUTABLE Executable to use for the detached process. --version show program's version number and exit -h, --help show this help message and exit
假如有一个 taskA 去处理一个队列 A 中的信息,一个 taskB 去处理队列 B 中的数据,然后起了 x 个 worker 去处理队列 A ,其他的 worker 去处理队列 B。而这时也可能会出现队列 B 中一些 task 急需处理,而此时堆积在队列 B 中的 tasks 很多,需要耗费很长时间来处理队列 B 中的 task。此时就需要定义优先队列来处理紧急的 task。
不应该讲 Database objects 比如一个 User Model 传入在后台执行的任务,因为这些 object 可能包含过期的数据。相反应该传入一个 user id ,让 task 在执行过程中向数据库请求全新的 User Object。
尽量简化 tasks
task 应该简洁 (concise):
将主要 task 逻辑包含在对象方法或者方法中
确保方法抛出明确的异常 (identified exceptions)
只有在切当的时机再实现重试机制
假设需要实现一个发送邮件的 task:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import requests from myproject.tasks import app # app is your celery application from myproject.exceptions import InvalidUserInput from utils.mail import api_send_mail
@app.task(bind=True, max_retries=3) defsend_mail(self, recipients, sender_email, subject, body): """Send a plaintext email with argument subject, sender and body to a list of recipients.""" try: data = api_send_mail(recipients, sender_email, subject, body) except InvalidUserInput: # No need to retry as the user provided an invalid input raise except Exception as exc: # Any other exception. Log the exception to sentry and retry in 10s. sentrycli.captureException() self.retry(countdown=10, exc=exc) return data
from myproject.tasks import app classBaseTask(app.Task): """Abstract base class for all tasks in my app.""" abstract = True defon_retry(self, exc, task_id, args, kwargs, einfo): """Log the exceptions to sentry at retry.""" sentrycli.captureException(exc) super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo) defon_failure(self, exc, task_id, args, kwargs, einfo): """Log the exceptions to sentry.""" sentrycli.captureException(exc) super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@app.task(bind=True, max_retries=3, soft_time_limit=5, base=BaseTask) defsend_mail(self, recipients, sender_email, subject, body): """Send a plaintext email with argument subject, sender and body to a list of recipients.""" try: data = api_send_mail(recipients, sender_email, subject, body) except InvalidUserInput: raise except Exception as exc: self.retry(countdown=backoff(self.request.retries), exc=exc) return data
functhreeSum(nums []int) [][]int { // 将数组进行从小到大的排序 sort.Ints(nums) iflen(nums) <= 2 { return [][]int{} } var result [][]int var left, right = 0, 0 for i := 0; i < len(nums); i ++ { // 如果最左侧的值大于0,则后面的值加起来肯定超过0,直接退出循环 if nums[i] > 0 { break } // 如果连续数值相同,则直接进行下一个 if i > 0 && nums[i] == nums[i-1] { continue }
left = i + 1 right = len(nums) - 1 for left < right { data := nums[i] + nums[left] + nums[right] // 如果正好等于0,则左右都移动,向中间逼近,并进行判重 if data == 0 { result = append(result, []int{nums[i], nums[left], nums[right]}) // 此处去重复,如果从左侧连着一样,直接去除掉 for left < right { if nums[left] == nums[left+1] { left ++ } else { break } } // 判断去重复后,直接左右向中间移动 left ++ right -- } // 如果小于0,则说明左侧值偏小,则左指针右移动 if data < 0 { left ++ } // 如果大于0,说明右侧值偏大,则指针左移动 if data > 0 { right -- } } } return result }
funcmain() { var testList []int = []int{-1,0,1,2,-1,-4} d := threeSum(testList) fmt.Println(d) }
import ( "fmt" "math" "strings" ) funcmyAtoi2(str string)int { //1. 去掉收尾空格 str = strings.TrimSpace(str) number := 0 flag := 1
for index, value := range str { // 如若是在0 - 9之间,则进行逐步计算 if value >= '0' && value <= '9' { number = number*10 + int(value-'0') fmt.Println(number,flag) } elseif value == '-' && index == 0 { // 如果是符号,但是如果不是第一位,则直接退出,如果是则记录符号 flag = -1 } elseif value == '+' && index == 0 { flag = 1 } else { // 如果都不在,则之间返回0 break } // 超阈值校验 if number > math.MaxInt32 { if flag == 1 { return math.MaxInt32 } else { return math.MinInt32 } } } return number * flag }
6. Z 字形变换 funcconvert(s string, numRows int)string { iflen(s) <= 1 { return s } // 初始化控制的索引和控制的标志 begin, flag := 0, -1 // 将每一行的字符进行拼接,然后最后将数组拼接 array := make([]string, len(s)) for _, value := range []rune(s) { array[begin] += string(value) if begin == 0 || begin == numRows -1 { // 如果是第一行和最后以后,在拼接完字符后,应该立马进行转化flag,反向操作 flag *= -1 } // 通过flag的改变控制下次拼接字符的索引,即行数 begin += flag } // 将数据进行拼接后返回 data := strings.Join(array, "") return data }
// 位图法 funclengthOfLongestSubStringBitMap(s string)(int, int, int) { iflen(s) == 0 { return0, 0, 0 } var BitMap [128]bool // 初始化目标值,左指针,右指针,开始位置与结束位置 var target, left, right, start, end = 0, 0, 0, 0, 0 for left < len(s) { // 遍历字符,只要不存在,则right一直加,否则,左指针移动 if BitMap[s[right]] { // 如果右指针存在值 // 左指针置为False BitMap[s[left]] = false left ++ } else { // 如果存在相同元素,则左指针右移,继续进行判断是否存在右指针元素重复 BitMap[s[right]] = true right ++ } // 计算最大长度并进行更新 if target < right - left { target = right - left } // 出口 if right >= len(s) || left + target >= len(s) { end = right start = end - target break }
} return target, start, end }
// hash法,字典方法,同上 funclengthOfLongestSubStringHash(s string)(int, int, int) { iflen(s) == 0 { return0, 0, 0 } var HashMap map[string]bool = map[string]bool{} var target, left, right, start, end = 0, 0, 0, 0, 0 for left < len(s) { // 遍历字符,只要不存在,则right一直加,否则,左指针移动 if HashMap[string(s[right])] { // 如果右指针存在值 // 左指针置为False HashMap[string(s[left])] = false left ++ } else { HashMap[string(s[right])] = true right ++ } if target < right - left { target = right - left } // 出口 if right >= len(s) || left + target >= len(s) { end = right start = end - target break } } return target, start, end }
// 滑动窗口法 funclengthOfLongestStringSplitWindows(s string)(int, int, int) { iflen(s) == 0 { return0, 0, 0 } var windows [128]int var target, left, right = 0, 0, 0 for left < len(s) { if windows[s[right] - 'a'] == 0 && right + 1 < len(s) { windows[s[right] - 'a'] ++ right ++ } else { windows[s[left] - 'a']-- left ++ } if target < right - left { target = right - left } }